package com.stay4it.rxjava;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import rx.util.async.Async;
import rx.util.async.StoppableObservable;

/**
 * RxJava异步操作符 
 * 
 * 异步操作符是属于可选的rxjava-async模块，需要单独下载jar包
 * http://search.maven.org/#search%7Cga%7C1%7Cio.reactivex.rxjava-async
 * 
 */
public class RxJava10 {

	public static class Task implements Runnable {

        @Override
        public void run() {
            System.out.println("run");
        }

    }
    public static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return 20;
        }

    }
    
    public static class ObservableCallableTask implements Callable<Observable<Integer>> {
    	
    	@Override
    	public Observable<Integer> call() throws Exception {
    		return Observable.just(999);
    	}
    	
    }
	
	static ExecutorService executorService = Executors.newCachedThreadPool();
	
	/**
	 * start 
	 * 解释：它接受一个函数作为参数，调用这个函数获取一个值，然后返回一个会发射这个值给后续观察者的Observable。
	 * 
	 * rxjava-async模块还包含这几个操作符：toAsync, asyncAction, 和asyncFunc,功能类似start。
	 * @throws InterruptedException 
	 */
	private static void test1() throws InterruptedException {
		
		Observable<Integer> observable = Async.start(new Func0<Integer>() {  
	           @Override  
	           public Integer call() {  
	               //函数内为异步操作  
	               try {  
	                   Thread.sleep(3000);  
	               } catch (InterruptedException e) {  
	                   e.printStackTrace();  
	               }  
	               return 20;  
	           }  
	       }); 
		
		observable
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
			}
		});

		Thread.sleep(4000); 
	}

	/**
	 * startFuture 
	 * startFuture操作符，传递给它一个返回Future的函数，startFuture会立即调用这个函数获取Future对象，
	 * 然后调用Future的get()方法尝试获取它的值。它返回一个发射这个值给后续观察者的Observable。
	 */
	private static void test2() {
		Future<Integer> future = executorService.submit(new CallableTask());
		
		Observable<Integer> observable = Async.startFuture(new Func0<Future<Integer>>() {

			@Override
			public Future<Integer> call() {
				return future;
			}
		});
		
		observable
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
			}
		});
		executorService.shutdown();
	}
	
	/**
	 * deferFuture 
	 * 解释：deferFuture中的函数可以异步执行一些操作，当完成后返回一个Observable，但是这个Observable不会立刻发射数据，直到开始订阅时。 
	 * 
	 */
	private static void test3() {
		
		Future<Observable<Integer>> future = executorService.submit(new ObservableCallableTask());
		
		Observable<Integer> observable = Async.deferFuture(new Func0<Future<Observable<Integer>>>() {

			@Override
			public Future<Observable<Integer>> call() {
				return future;
			}
		});
		
		Observable.just(1, 2, 3, 4, 5, 6)
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
				if (value == 4) {
					observable.subscribe(new Action1<Integer>() {

						@Override
						public void call(Integer value) {
							System.out.println("call2 value = " + value);

						}
					});
				}

			}
		});
		
	}
	
	/**
	 * 
	 * fromAction 
	 * 解释：当fromAction()中的函数Action0执行完成后发射数据.
	 * 
	 * 与fromRunnable类似。
	 * @throws InterruptedException 
	 */
	private static void test4() throws InterruptedException {
		Observable<Integer> observable = Async.fromAction(new Action0() {

			@Override
			public void call() {
				for (int i = 1; i < 10; i++) {
					try {
						System.out.println("call " + i);
						Thread.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}, 999);
		
		observable
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
			}
		});
		Thread.sleep(6000); 
	}
	
	/**
	 * 
	 * fromCallable 
	 * 解释：当fromCallable中的函数运行完成后将其结果发射出去。
	 * 
	 * @throws InterruptedException 
	 */
	private static void test5() throws InterruptedException {
		//http://blog.csdn.net/nicolelili1/article/details/52181469
		//http://sherlockshi.github.io/page/3/	
		
		Observable<Integer> observable = Async.fromCallable(new Callable<Integer>() {

			@Override
			public Integer call() throws Exception {
				int num = 1;  
				for(int i=1;i<10;i++){  
		            try { 
		                Thread.sleep(500);  
		            } catch (InterruptedException e) {  
		                e.printStackTrace();  
		            }  
		            num *= i;
		        }  
				return num;
			}
		});
		
		observable
		.subscribe(new Action1<Integer>() {
			
			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
			}
		});
		Thread.sleep(6000); 
	}

	/**
	 * 
	 * fromRunnable 
	 * 解释：当fromCallable中的函数运行完成后将其结果发射出去。
	 * 
	 * 与fromAction类似。
	 * @throws InterruptedException 
	 */
	private static void test6() throws InterruptedException {
		
		Observable<Integer> observable = Async.fromRunnable(new Runnable() {
			
			@Override
			public void run() {
				for (int i = 1; i < 10; i++) {
					try {
						System.out.println("call " + i);
						Thread.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		},999);
		
		observable
		.subscribe(new Action1<Integer>() {
			
			@Override
			public void call(Integer value) {
				System.out.println("call value = " + value);
			}
		});
		Thread.sleep(6000); 
	}
	
	/**
	 * forEachFuture 
	 * 解释：它其实不算Start操作符的一个变体，而是有一些自己的特点。
	 * 
	 * 传递一些典型的观察者方法（如onNext, onError和onCompleted）给它，Observable会以通常的方式调用它。
	 * 但是forEachFuture自己返回一个Future并且在get()方法处阻塞，直到原始Observable执行完成，然后它返回，
	 * 完成还是错误依赖于原始Observable是完成还是错误。
	 * 
	 * 
	 * 如果你想要一个函数阻塞直到Observable执行完成，可以使用这个操作符。
	 * 
	 * public static <T> FutureTask<Void> forEachFuture(
        Observable<? extends T> source,
        Action1<? super T> onNext,
        Action1<? super Throwable> onError,
        Action0 onCompleted)
	 */
	private static void test7() {
		
		Async.forEachFuture(Observable.just(1, 2, 3, 4, 5), 
				new Action1<Integer>() {

					@Override
					public void call(Integer value) {
						System.out.println("onNext value = " + value);
						
					}
				}, new Action1<Throwable>() {

					@Override
					public void call(Throwable t) {
						System.out.println("onError e = " + t);
						
					}
				}, new Action0() {
					
					@Override
					public void call() {
						System.out.println("onCompleted ");
						
					}
				});
		
	}
	
	/**
	 * runAsync 
	 * 解释：很特殊，返回一个叫做StoppableObservable的特殊Observable。传递一个Action和一个Scheduler给runAsync，
	 * 它返回一个使用这个Action产生数据的StoppableObservable。这个Action接受一个Observable和一个Subscription作为参数，
	 * 它使用Subscription检查unsubscribed条件，一旦发现条件为真就立即停止发射数据。在任何时候你都可以使用unsubscribe方法
	 * 手动停止一个StoppableObservable（这会同时取消订阅与这个StoppableObservable关联的Subscription）。
	 * 
	 * 注意：
	 * 由于runAsync会立即调用Action并开始发射数据，在你创建StoppableObservable之后到你的观察者准备好接受数据之前这段时间里，
	 * 可能会有一部分数据会丢失。
	 * 
	 * @throws InterruptedException 
	 */
	private static void test8() throws InterruptedException {
		
		Action2<Observer<? super Integer>, Subscription> action2 = new Action2<Observer<? super Integer>, Subscription>(){

			@Override
			public void call(Observer<? super Integer> observer, Subscription subscription) {
				int i = 0;
				while (!subscription.isUnsubscribed()) {
					//System.out.println("call i = " + i);  
					observer.onNext(i);
					i++;
					if(i > 4) {
						subscription.unsubscribe();
						observer.onCompleted();
					}
					
					try {  
	                       Thread.sleep(1000);  
	                   } catch (InterruptedException e) {  
	                       e.printStackTrace();  
	                   }  
				}
			}
			
		};
		
		 StoppableObservable<Integer> stoppableObservable  =  Async.runAsync(Schedulers.io(), action2); 
		 
		 stoppableObservable
		 .subscribe(new Subscriber<Integer>() {  
	           @Override  
	           public void onNext(Integer value) {  
	        	   System.out.println("onNext value = " + value);  
	           }  
	  
	           @Override  
	           public void onCompleted() {  
	        	   System.out.println("onCompleted");  
	           }  
	  
	           @Override  
	           public void onError(Throwable e) {  
	        	   System.out.println("onError");  
	           }  
	       });
		 
		 Thread.sleep(5000);
	}
	
	public static void main(String[] args) throws InterruptedException {
		test1();

	}

}
